HI3861学习笔记(24)

您所在的位置:网站首页 linux dup! HI3861学习笔记(24)

HI3861学习笔记(24)

2023-04-23 16:00| 来源: 网络整理| 查看: 265

一、MQTT简介 1.1 实现方式

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

MQTT服务器的主要工作是数据分发,没有数据保存功能。 可以订阅自己发布的主题,服务器就是回发测试。 MQTT让逻辑变得更清晰,需要什么订阅什么。 走标准化流程,解放了私有协议制定、实现、调试、测试一整套复杂的流程。 1.2 Paho MQTT

Eclipse Paho项目是IBM在2011年建立的Eclipse开源项目,该项目包含以C、Java、Python、Javascript等语言编写的可用客户端。

二、移植Paho MQTT 2.1 下载源码

嵌入式C语言客户端开源地址:https://github.com/eclipse/paho.mqtt.embedded-c

下载之后解压,会得到这么一个文件夹:

MQTTClient: 封装MQTTPacket生成的高级别C++客户端程序。 MQTTClient-C: 封装MQTTPacket生成的高级别C客户端程序 samples目录提供FreeRTOS和Linux两个例程,分别支持FreeRTOS和Linux系统。 src目录提供MQTTClient的代码实现能力,以及用于移植到对应平台的网络驱动。 MQTTPacket: 提供MQTT数据包的序列化与反序列化,以及部分辅助函数。 2.2 新建BUILD.gn

在鸿蒙系统源码的 third_party 文件夹下创建一个 paho_mqtt 文件夹,然后把解压后的所有文件都拷贝到 paho_mqtt 文件夹下

下一步,我们在 paho_mqtt 文件夹下面新建 BUILD.gn 文件,用来构建编译。

其内容如下:

# Copyright (c) 2020 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import("//build/lite/config/component/lite_component.gni") import("//build/lite/ndk/ndk.gni") config("pahomqtt_config") { include_dirs = [ "MQTTPacket/src", "MQTTClient-C/src", "MQTTClient-C/src/liteOS", "//third_party/iot_link/network/mqtt/mqtt_al", "//third_party/iot_link/inc", "//vendor/hisi/hi3861/hi3861/third_party/lwip_sack/include", "//kernel/liteos_m/components/cmsis/2.0", ] } cflags = [ "-Wno-unused-variable" ] cflags += [ "-Wno-unused-but-set-variable" ] cflags += [ "-Wno-unused-parameter" ] cflags += [ "-Wno-sign-compare" ] cflags += [ "-Wno-unused-function" ] cflags += [ "-Wno-return-type" ] pahomqtt_sources = [ "MQTTClient-C/src/liteOS/MQTTLiteOS.c", "MQTTClient-C/src/MQTTClient.c", "MQTTPacket/src/MQTTConnectClient.c", "MQTTPacket/src/MQTTConnectServer.c", "MQTTPacket/src/MQTTDeserializePublish.c", "MQTTPacket/src/MQTTFormat.c", "MQTTPacket/src/MQTTPacket.c", "MQTTPacket/src/MQTTSerializePublish.c", "MQTTPacket/src/MQTTSubscribeClient.c", "MQTTPacket/src/MQTTSubscribeServer.c", "MQTTPacket/src/MQTTUnsubscribeClient.c", "MQTTPacket/src/MQTTUnsubscribeServer.c", "MQTTPacket/samples/transport.c", ] lite_library("pahomqtt_static") { target_type = "static_library" sources = pahomqtt_sources public_configs = [ ":pahomqtt_config" ] } lite_library("pahomqtt_shared") { target_type = "shared_library" sources = pahomqtt_sources public_configs = [ ":pahomqtt_config" ] } ndk_lib("pahomqtt_ndk") { if (board_name != "hi3861v100") { lib_extension = ".so" deps = [ ":pahomqtt_shared" ] } else { deps = [ ":pahomqtt_static" ] } head_files = [ "//third_party/paho_mqtt" ] } 2.3 创建LiteOS文件夹

MQTT 已经提供了 Linux 和 Freertos 的移植,这里我们参考,新建文件夹: third_party\paho_mqtt\MQTTClient-C\src\liteOS 里面存放两个文件: MQTTLiteOS.c 和 MQTTLiteOS.h

内容如下:

2.3.1 MQTTLiteOS.c /******************************************************************************* * Copyright (c) 2014, 2015 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Allan Stockdill-Mander - initial API and implementation and/or initial documentation * Ian Craggs - convert to FreeRTOS *******************************************************************************/ #include "MQTTLiteOS.h" int ThreadStart(Thread* thread, void (*fn)(void*), void* arg) { int rc = 0; thread = thread; osThreadAttr_t attr; attr.name = "MQTTTask"; attr.attr_bits = 0U; attr.cb_mem = NULL; attr.cb_size = 0U; attr.stack_mem = NULL; attr.stack_size = 2048; attr.priority = osThreadGetPriority(osThreadGetId()); rc = (int)osThreadNew((osThreadFunc_t)fn, arg, &attr); return rc; } void TimerInit(Timer* timer) { timer->end_time = (struct timeval){0, 0}; } char TimerIsExpired(Timer* timer) { struct timeval now, res; gettimeofday(&now, NULL); timersub(&timer->end_time, &now, &res); return res.tv_sec < 0 || (res.tv_sec == 0 && res.tv_usec end_time); } void TimerCountdown(Timer* timer, unsigned int timeout) { struct timeval now; gettimeofday(&now, NULL); struct timeval interval = {timeout, 0}; timeradd(&now, &interval, &timer->end_time); } int TimerLeftMS(Timer* timer) { struct timeval now, res; gettimeofday(&now, NULL); timersub(&timer->end_time, &now, &res); //printf("left %d ms\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000); return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000; } void MutexInit(Mutex* mutex) { mutex->sem = osSemaphoreNew(1, 1, NULL); } int MutexLock(Mutex* mutex) { return osSemaphoreAcquire(mutex->sem, LOS_WAIT_FOREVER); } int MutexUnlock(Mutex* mutex) { return osSemaphoreRelease(mutex->sem); } int linux_read(Network* n, unsigned char* buffer, int len, int timeout_ms) { struct timeval interval = {timeout_ms / 1000, (timeout_ms % 1000) * 1000}; if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec my_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval)); int bytes = 0; while (bytes < len) { int rc = recv(n->my_socket, &buffer[bytes], (size_t)(len - bytes), 0); if (rc == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) bytes = -1; break; } else if (rc == 0) { bytes = 0; break; } else bytes += rc; } return bytes; } int linux_write(Network* n, unsigned char* buffer, int len, int timeout_ms) { struct timeval tv; tv.tv_sec = 0; /* 30 Secs Timeout */ tv.tv_usec = timeout_ms * 1000; // Not init'ing this can cause strange errors setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,sizeof(struct timeval)); int rc = send(n->my_socket, buffer, len, 0); return rc; } void NetworkInit(Network* n) { n->my_socket = 0; n->mqttread = linux_read; n->mqttwrite = linux_write; } int NetworkConnect(Network* n, char* addr, int port) { int type = SOCK_STREAM; struct sockaddr_in address; int rc = -1; sa_family_t family = AF_INET; struct addrinfo *result = NULL; struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL}; if ((rc = getaddrinfo(addr, NULL, &hints, &result)) == 0) { struct addrinfo* res = result; /* prefer ip4 addresses */ while (res) { if (res->ai_family == AF_INET) { result = res; break; } res = res->ai_next; } if (result->ai_family == AF_INET) { address.sin_port = htons(port); address.sin_family = family = AF_INET; address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr; } else rc = -1; freeaddrinfo(result); } if (rc == 0) { n->my_socket = socket(family, type, 0); if (n->my_socket != -1) rc = connect(n->my_socket, (struct sockaddr*)&address, sizeof(address)); else rc = -1; } return rc; } void NetworkDisconnect(Network* n) { close(n->my_socket); } 2.3.2 MQTTLiteOS.h /******************************************************************************* * Copyright (c) 2014, 2015 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Allan Stockdill-Mander - initial API and implementation and/or initial documentation *******************************************************************************/ #if !defined(MQTTLiteOS_H) #define MQTTLiteOS_H #include #if !defined(SOCKET_ERROR) /** error in socket operation */ #define SOCKET_ERROR -1 #endif #if defined(WIN32) /* default on Windows is 64 - increase to make Linux and Windows the same */ #define FD_SETSIZE 1024 #include #include #define MAXHOSTNAMELEN 256 #define EAGAIN WSAEWOULDBLOCK #define EINTR WSAEINTR #define EINVAL WSAEINVAL #define EINPROGRESS WSAEINPROGRESS #define EWOULDBLOCK WSAEWOULDBLOCK #define ENOTCONN WSAENOTCONN #define ECONNRESET WSAECONNRESET #define ioctl ioctlsocket #define socklen_t int #else #define INVALID_SOCKET SOCKET_ERROR #include #include #include #include #include #include #include #include #include #include #include #include #include #endif #if defined(WIN32) #include #else #include #include #endif #include "ohos_init.h" #include "cmsis_os2.h" #include "lwip/ip_addr.h" #include "lwip/netifapi.h" #include "lwip/sockets.h" #define MQTT_TASK typedef struct Thread { osThreadId_t task; } Thread; int ThreadStart(Thread*, void (*fn)(void*), void* arg); typedef struct Timer { struct timeval end_time; } Timer; typedef struct Mutex { osSemaphoreId_t sem; } Mutex; void MutexInit(Mutex*); int MutexLock(Mutex*); int MutexUnlock(Mutex*); void TimerInit(Timer*); char TimerIsExpired(Timer*); void TimerCountdownMS(Timer*, unsigned int); void TimerCountdown(Timer*, unsigned int); int TimerLeftMS(Timer*); typedef struct Network { int my_socket; int (*mqttread) (struct Network*, unsigned char*, int, int); int (*mqttwrite) (struct Network*, unsigned char*, int, int); } Network; int linux_read(Network*, unsigned char*, int, int); int linux_write(Network*, unsigned char*, int, int); void NetworkInit(Network*); int NetworkConnect(Network*, char*, int); void NetworkDisconnect(Network*); #endif 2.4 修改MQTTClient.c和MQTTClient.h

打开 third_party\paho_mqtt\MQTTClient-C\src\MQTTClient.c 文件,这个文件也是我们主要移植的文件,我们需要实现 socket 相关的操作,包括发送、接收数据。

2.4.1 MQTTClient.c /******************************************************************************* * Copyright (c) 2014, 2017 IBM Corp. * * All rights reserved. This prog/******************************************************************************* * Copyright (c) 2014, 2017 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - fix for #96 - check rem_len in readPacket * Ian Craggs - add ability to set message handler separately #6 *******************************************************************************/ #include "MQTTClient.h" #include #include static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) { md->topicName = aTopicName; md->message = aMessage; } static int getNextPacketId(MQTTClient *c) { return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1; } static int sendPacket(MQTTClient* c, int length, Timer* timer) { int rc = FAILURE, sent = 0; while (sent < length && !TimerIsExpired(timer)) { rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer)); if (rc < 0) // there was an error writing the data break; sent += rc; } if (sent == length) { TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet rc = SUCCESS; } else rc = FAILURE; return rc; } void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms, unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size) { int i; c->ipstack = network; for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) c->messageHandlers[i].topicFilter = 0; c->command_timeout_ms = command_timeout_ms; c->buf = sendbuf; c->buf_size = sendbuf_size; c->readbuf = readbuf; c->readbuf_size = readbuf_size; c->isconnected = 0; c->cleansession = 0; c->ping_outstanding = 0; c->defaultMessageHandler = NULL; c->next_packetid = 1; TimerInit(&c->last_sent); TimerInit(&c->last_received); #if defined(MQTT_TASK) MutexInit(&c->mutex); #endif } static int decodePacket(MQTTClient* c, int* value, int timeout) { unsigned char i; int multiplier = 1; int len = 0; const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; *value = 0; do { int rc = MQTTPACKET_READ_ERROR; if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { rc = MQTTPACKET_READ_ERROR; /* bad data */ goto exit; } rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout); if (rc != 1) goto exit; *value += (i & 127) * multiplier; multiplier *= 128; } while ((i & 128) != 0); exit: return len; } static int readPacket(MQTTClient* c, Timer* timer) { MQTTHeader header = {0}; int len = 0; int rem_len = 0; /* 1. read the header byte. This has the packet type in it */ int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer)); if (rc != 1) goto exit; len = 1; /* 2. read the remaining length. This is variable in itself */ decodePacket(c, &rem_len, TimerLeftMS(timer)); len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ if ((unsigned int)rem_len > (c->readbuf_size - len)) { rc = BUFFER_OVERFLOW; goto exit; } /* 3. read the rest of the buffer using a callback to supply the rest of the data */ if (rem_len > 0 && (rc = c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) { rc = 0; goto exit; } header.byte = c->readbuf[0]; rc = header.bits.type; if (c->keepAliveInterval > 0) TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet exit: return rc; } // assume topic filter and name is in correct format // # can only be at end // + and # can only be next to separator static char isTopicMatched(char* topicFilter, MQTTString* topicName) { char* curf = topicFilter; char* curn = topicName->lenstring.data; char* curn_end = curn + topicName->lenstring.len; while (*curf && curn < curn_end) { if (*curn == '/' && *curf != '/') break; if (*curf != '+' && *curf != '#' && *curf != *curn) break; if (*curf == '+') { // skip until we meet the next separator, or end of string char* nextpos = curn + 1; while (nextpos < curn_end && *nextpos != '/') nextpos = ++curn + 1; } else if (*curf == '#') curn = curn_end - 1; // skip until end of string curf++; curn++; }; return (curn == curn_end) && (*curf == '\0'); } int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message) { int i; int rc = FAILURE; // we have to find the right message handler - indexed by topic for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) || isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName))) { if (c->messageHandlers[i].fp != NULL) { MessageData md; NewMessageData(&md, topicName, message); c->messageHandlers[i].fp(&md); rc = SUCCESS; } } } if (rc == FAILURE && c->defaultMessageHandler != NULL) { MessageData md; NewMessageData(&md, topicName, message); c->defaultMessageHandler(&md); rc = SUCCESS; } return rc; } int keepalive(MQTTClient* c) { int rc = SUCCESS; if (c->keepAliveInterval == 0) goto exit; // If we are waiting for a ping response, check if it has been too long if ( c->ping_outstanding == 1 ){ if ( TimerIsExpired(&c->pingresp_timer) ){ rc = FAILURE; /* PINGRESP not received in keepalive interval */ goto exit; } } else{ // If we have not sent or received anything in the timeout period, // send out a ping request if ( TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received) ) { Timer timer; TimerInit(&timer); TimerCountdownMS(&timer, 1000); int len = MQTTSerialize_pingreq(c->buf, c->buf_size); if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS){ // send the ping packet // send the ping packet // Expect the PINGRESP within 2 seconds of the PINGREQ // being sent TimerCountdownMS(&c->pingresp_timer, 2000 ); c->ping_outstanding = 1; } } } exit: return rc; } void MQTTCleanSession(MQTTClient* c) { int i = 0; for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) c->messageHandlers[i].topicFilter = NULL; } void MQTTCloseSession(MQTTClient* c) { c->ping_outstanding = 0; c->isconnected = 0; if (c->cleansession) MQTTCleanSession(c); } int cycle(MQTTClient* c, Timer* timer) { int len = 0, rc = SUCCESS; int packet_type = readPacket(c, timer); /* read the socket, see what work is due */ switch (packet_type) { default: /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */ rc = packet_type; goto exit; case 0: /* timed out reading packet */ break; case CONNACK: case PUBACK: case SUBACK: case UNSUBACK: break; case PUBLISH: { MQTTString topicName; MQTTMessage msg; int intQoS; msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName, (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1) goto exit; msg.qos = (enum QoS)intQoS; deliverMessage(c, &topicName, &msg); if (msg.qos != QOS0) { if (msg.qos == QOS1) len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id); else if (msg.qos == QOS2) len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id); if (len readbuf, c->readbuf_size) != 1) rc = FAILURE; else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) ping_outstanding = 0; break; } if (keepalive(c) != SUCCESS) { //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT rc = FAILURE; } exit: if (rc == SUCCESS) rc = packet_type; else if (c->isconnected) MQTTCloseSession(c); return rc; } int MQTTYield(MQTTClient* c, int timeout_ms) { int rc = SUCCESS; Timer timer; TimerInit(&timer); TimerCountdownMS(&timer, timeout_ms); do { if (cycle(c, &timer) < 0) { rc = FAILURE; break; } } while (!TimerIsExpired(&timer)); return rc; } int MQTTIsConnected(MQTTClient* client) { return client->isconnected; } void MQTTRun(void* parm) { Timer timer; MQTTClient* c = (MQTTClient*)parm; TimerInit(&timer); while (1) { #if defined(MQTT_TASK) MutexLock(&c->mutex); #endif TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */ cycle(c, &timer); #if defined(MQTT_TASK) MutexUnlock(&c->mutex); #endif } } #if defined(MQTT_TASK) int MQTTStartTask(MQTTClient* client) { return ThreadStart(&client->thread, &MQTTRun, client); } #endif int waitfor(MQTTClient* c, int packet_type, Timer* timer) { int rc = FAILURE; do { if (TimerIsExpired(timer)) break; // we timed out rc = cycle(c, timer); } while (rc != packet_type && rc >= 0); return rc; } int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data) { Timer connect_timer; int rc = FAILURE; MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; int len = 0; #if defined(MQTT_TASK) MutexLock(&c->mutex); #endif if (c->isconnected) /* don't send connect packet again if we are already connected */ goto exit; TimerInit(&connect_timer); TimerCountdownMS(&connect_timer, c->command_timeout_ms); if (options == 0) options = &default_options; /* set default options if none were supplied */ c->keepAliveInterval = options->keepAliveInterval; c->cleansession = options->cleansession; TimerCountdown(&c->last_received, c->keepAliveInterval); if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) rc = 0; data->sessionPresent = 0; if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1) rc = data->rc; else rc = FAILURE; } else rc = FAILURE; exit: if (rc == SUCCESS) { c->isconnected = 1; c->ping_outstanding = 0; } #if defined(MQTT_TASK) MutexUnlock(&c->mutex); #endif return rc; } int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options) { MQTTConnackData data; return MQTTConnectWithResults(c, options, &data); } int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler) { int rc = FAILURE; int i = -1; /* first check for an existing matching slot */ for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0) { if (messageHandler == NULL) /* remove existing */ { c->messageHandlers[i].topicFilter = NULL; c->messageHandlers[i].fp = NULL; } rc = SUCCESS; /* return i when adding new subscription */ break; } } /* if no existing, look for empty slot (unless we are removing) */ if (messageHandler != NULL) { if (rc == FAILURE) { for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { if (c->messageHandlers[i].topicFilter == NULL) { rc = SUCCESS; break; } } } if (i < MAX_MESSAGE_HANDLERS) { c->messageHandlers[i].topicFilter = topicFilter; c->messageHandlers[i].fp = messageHandler; } } return rc; } int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler, MQTTSubackData* data) { int rc = FAILURE; Timer timer; int len = 0; MQTTString topic = MQTTString_initializer; topic.cstring = (char *)topicFilter; #if defined(MQTT_TASK) MutexLock(&c->mutex); #endif if (!c->isconnected) goto exit; TimerInit(&timer); TimerCountdownMS(&timer, c->command_timeout_ms); len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos); if (len grantedQoS = QOS0; if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&data->grantedQoS, c->readbuf, c->readbuf_size) == 1) { if (data->grantedQoS != 0x80) rc = MQTTSetMessageHandler(c, topicFilter, messageHandler); } } else rc = FAILURE; exit: if (rc == FAILURE) MQTTCloseSession(c); #if defined(MQTT_TASK) MutexUnlock(&c->mutex); #endif return rc; } int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler) { MQTTSubackData data; return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data); } int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter) { int rc = FAILURE; Timer timer; MQTTString topic = MQTTString_initializer; topic.cstring = (char *)topicFilter; int len = 0; #if defined(MQTT_TASK) MutexLock(&c->mutex); #endif if (!c->isconnected) goto exit; TimerInit(&timer); TimerCountdownMS(&timer, c->command_timeout_ms); if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) readbuf, c->readbuf_size) == 1) { /* remove the subscription message handler associated with this topic, if there is one */ MQTTSetMessageHandler(c, topicFilter, NULL); } } else rc = FAILURE; exit: if (rc == FAILURE) MQTTCloseSession(c); #if defined(MQTT_TASK) MutexUnlock(&c->mutex); #endif return rc; } int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message) { int rc = FAILURE; Timer timer; MQTTString topic = MQTTString_initializer; topic.cstring = (char *)topicName; int len = 0; #if defined(MQTT_TASK) MutexLock(&c->mutex); #endif if (!c->isconnected) goto exit; TimerInit(&timer); TimerCountdownMS(&timer, c->command_timeout_ms); if (message->qos == QOS1 || message->qos == QOS2) message->id = getNextPacketId(c); len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id, topic, (unsigned char*)message->payload, message->payloadlen); if (len qos == QOS1) { if (waitfor(c, PUBACK, &timer) == PUBACK) { unsigned short mypacketid; unsigned char dup, type; if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) rc = FAILURE; } else rc = FAILURE; } else if (message->qos == QOS2) { if (waitfor(c, PUBCOMP, &timer) == PUBCOMP) { unsigned short mypacketid; unsigned char dup, type; if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) rc = FAILURE; } else rc = FAILURE; } exit: if (rc == FAILURE) MQTTCloseSession(c); #if defined(MQTT_TASK) MutexUnlock(&c->mutex); #endif return rc; } int MQTTDisconnect(MQTTClient* c) { int rc = FAILURE; Timer timer; // we might wait for incomplete incoming publishes to complete int len = 0; #if defined(MQTT_TASK) MutexLock(&c->mutex); #endif TimerInit(&timer); TimerCountdownMS(&timer, c->command_timeout_ms); len = MQTTSerialize_disconnect(c->buf, c->buf_size); if (len > 0) rc = sendPacket(c, len, &timer); // send the disconnect packet MQTTCloseSession(c); #if defined(MQTT_TASK) MutexUnlock(&c->mutex); #endif return rc; } 2.4.2 MQTTClient.h /******************************************************************************* * Copyright (c) 2014, 2017 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - documentation and platform specific header * Ian Craggs - add setMessageHandler function *******************************************************************************/ #if !defined(MQTT_CLIENT_H) #define MQTT_CLIENT_H #if defined(__cplusplus) extern "C" { #endif #if defined(WIN32_DLL) || defined(WIN64_DLL) #define DLLImport __declspec(dllimport) #define DLLExport __declspec(dllexport) #elif defined(LINUX_SO) #define DLLImport extern #define DLLExport __attribute__ ((visibility ("default"))) #else #define DLLImport #define DLLExport #endif #include "liteOS/MQTTLiteOS.h" #include "MQTTPacket.h" #if defined(MQTTCLIENT_PLATFORM_HEADER) /* The following sequence of macros converts the MQTTCLIENT_PLATFORM_HEADER value * into a string constant suitable for use with include. */ #define xstr(s) str(s) #define str(s) #s #include xstr(MQTTCLIENT_PLATFORM_HEADER) #endif #define MAX_PACKET_ID 65535 /* according to the MQTT specification - do not change! */ #if !defined(MAX_MESSAGE_HANDLERS) #define MAX_MESSAGE_HANDLERS 5 /* redefinable - how many subscriptions do you want? */ #endif enum QoS { QOS0, QOS1, QOS2, SUBFAIL=0x80 }; /* all failure return codes must be negative */ enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; /* The Platform specific header must define the Network and Timer structures and functions * which operate on them. * typedef struct Network { int (*mqttread)(Network*, unsigned char* read_buffer, int, int); int (*mqttwrite)(Network*, unsigned char* send_buffer, int, int); } Network;*/ /* The Timer structure must be defined in the platform specific header, * and have the following functions to operate on it. */ extern void TimerInit(Timer*); extern char TimerIsExpired(Timer*); extern void TimerCountdownMS(Timer*, unsigned int); extern void TimerCountdown(Timer*, unsigned int); extern int TimerLeftMS(Timer*); typedef struct MQTTMessage { enum QoS qos; unsigned char retained; unsigned char dup; unsigned short id; void *payload; size_t payloadlen; } MQTTMessage; typedef struct MessageData { MQTTMessage* message; MQTTString* topicName; } MessageData; typedef struct MQTTConnackData { unsigned char rc; unsigned char sessionPresent; } MQTTConnackData; typedef struct MQTTSubackData { enum QoS grantedQoS; } MQTTSubackData; typedef void (*messageHandler)(MessageData*); typedef struct MQTTClient { unsigned int next_packetid, command_timeout_ms; size_t buf_size, readbuf_size; unsigned char *buf, *readbuf; unsigned int keepAliveInterval; char ping_outstanding; int isconnected; int cleansession; struct MessageHandlers { const char* topicFilter; void (*fp) (MessageData*); } messageHandlers[MAX_MESSAGE_HANDLERS]; /* Message handlers are indexed by subscription topic */ void (*defaultMessageHandler) (MessageData*); Network* ipstack; Timer last_sent, last_received, pingresp_timer; #if defined(MQTT_TASK) Mutex mutex; Thread thread; #endif } MQTTClient; #define DefaultClient {0, 0, 0, 0, NULL, NULL, 0, 0, 0} /** * Create an MQTT client object * @param client * @param network * @param command_timeout_ms * @param */ DLLExport void MQTTClientInit(MQTTClient* client, Network* network, unsigned int command_timeout_ms, unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size); /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack * The nework object must be connected to the network endpoint before calling this * @param options - connect options * @return success code */ DLLExport int MQTTConnectWithResults(MQTTClient* client, MQTTPacket_connectData* options, MQTTConnackData* data); /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack * The nework object must be connected to the network endpoint before calling this * @param options - connect options * @return success code */ DLLExport int MQTTConnect(MQTTClient* client, MQTTPacket_connectData* options); /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs * @param client - the client object to use * @param topic - the topic to publish to * @param message - the message to send * @return success code */ DLLExport int MQTTPublish(MQTTClient* client, const char*, MQTTMessage*); /** MQTT SetMessageHandler - set or remove a per topic message handler * @param client - the client object to use * @param topicFilter - the topic filter set the message handler for * @param messageHandler - pointer to the message handler function or NULL to remove * @return success code */ DLLExport int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler); /** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning. * @param client - the client object to use * @param topicFilter - the topic filter to subscribe to * @param message - the message to send * @return success code */ DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler); /** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning. * @param client - the client object to use * @param topicFilter - the topic filter to subscribe to * @param message - the message to send * @param data - suback granted QoS returned * @return success code */ DLLExport int MQTTSubscribeWithResults(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler, MQTTSubackData* data); /** MQTT Subscribe - send an MQTT unsubscribe packet and wait for unsuback before returning. * @param client - the client object to use * @param topicFilter - the topic filter to unsubscribe from * @return success code */ DLLExport int MQTTUnsubscribe(MQTTClient* client, const char* topicFilter); /** MQTT Disconnect - send an MQTT disconnect packet and close the connection * @param client - the client object to use * @return success code */ DLLExport int MQTTDisconnect(MQTTClient* client); /** MQTT Yield - MQTT background * @param client - the client object to use * @param time - the time, in milliseconds, to yield for * @return success code */ DLLExport int MQTTYield(MQTTClient* client, int time); /** MQTT isConnected * @param client - the client object to use * @return truth value indicating whether the client is connected to the server */ DLLExport int MQTTIsConnected(MQTTClient* client); #if defined(MQTT_TASK) /** MQTT start background thread for a client. After this, MQTTYield should not be called. * @param client - the client object to use * @return success code */ DLLExport int MQTTStartTask(MQTTClient* client); #endif #if defined(__cplusplus) } #endif #endif 三、API说明

业务BUILD.gn中包含路径

include_dirs = [ "//utils/native/lite/include", "//kernel/liteos_m/components/cmsis/2.0", "//base/iot_hardware/interfaces/kits/wifiiot_lite", "//foundation/communication/interfaces/kits/wifi_lite/wifiservice", "//vendor/hisi/hi3861/hi3861/third_party/lwip_sack/include/", "//third_party/paho_mqtt/MQTTPacket/src", "//third_party/paho_mqtt/MQTTClient-C/src", ]

以下网络接口位于 third_party\paho_mqtt\MQTTClient-C\src\liteOS\MQTTLiteOS.h。

3.1 NetworkInit 功能 初始化网络结构体,socket描述符、socket发送和接收函数 函数定义 void NetworkInit(Network* n) 参数 n:网络结构体 返回 无 3.2 NetworkConnect 功能 连接到指定的MQTT服务器IP、端口号 函数定义 int NetworkConnect(Network* n, char* addr, int port) 参数 n:网络结构体addr:IP地址port:端口 返回 0 - 成功,-1 - 失败 3.3 NetworkDisconnect 功能 关闭网络连接 函数定义 void NetworkDisconnect(Network* n) 参数 n:网络结构体 返回 无

以下 MQTT 接口位于 third_party\paho_mqtt\MQTTClient-C\src\liteOS\MQTTLiteOS.h。

3.4 MQTTClientInit 功能 创建一个MQTT客户端对象 函数定义 void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms, unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size) 参数 c:MQTT客户端network:网络结构体command_timeout_ms:超时时间sendbuf:发送缓冲区sendbuf_size:发送缓冲区大小readbuf:接收缓冲区readbuf_size:接收缓冲区大小 返回 无 3.5 MQTTConnect 功能 发送MQTT连接数据包 函数定义 int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options) 参数 c:MQTT客户端options:连接参数 返回 错误码 3.6 MQTTPublish 功能 发送MQTT发布数据包 函数定义 int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message) 参数 c:MQTT客户端topicName:主题message:消息 返回 错误码 3.7 MQTTSubscribe 功能 发送MQTT订阅数据包 函数定义 int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, messageHandler messageHandler) 参数 c:MQTT客户端topicFilter:主题qos:服务质量messageHandler:消息处理函数 返回 错误码 3.8 MQTTDisconnect 功能 发送MQTT断开连接数据包并关闭连接 函数定义 int MQTTDisconnect(MQTTClient* c) 参数 c:MQTT客户端 返回 错误码 四、MQTT客户端 4.1 配置SSID和密码连接WIFI

wifi_connect.c

#include #include #include #include "lwip/netif.h" #include "lwip/netifapi.h" #include "lwip/ip4_addr.h" #include "lwip/api_shell.h" #include "cmsis_os2.h" #include "hos_types.h" #include "wifi_device.h" #include "wifiiot_errno.h" #include "ohos_init.h" #define DEF_TIMEOUT 15 #define ONE_SECOND 1 #define SELECT_WIFI_SECURITYTYPE WIFI_SEC_TYPE_PSK static void WiFiInit(void); static void WaitSacnResult(void); static int WaitConnectResult(void); static void OnWifiScanStateChangedHandler(int state, int size); static void OnWifiConnectionChangedHandler(int state, WifiLinkedInfo *info); static void OnHotspotStaJoinHandler(StationInfo *info); static void OnHotspotStateChangedHandler(int state); static void OnHotspotStaLeaveHandler(StationInfo *info); static int g_staScanSuccess = 0; static int g_ConnectSuccess = 0; static int ssid_count = 0; WifiEvent g_wifiEventHandler = {0}; WifiErrorCode error; #define SELECT_WLAN_PORT "wlan0" int WifiConnect(const char *ssid, const char *psk) { WifiScanInfo *info = NULL; unsigned int size = WIFI_SCAN_HOTSPOT_LIMIT; static struct netif *g_lwip_netif = NULL; osDelay(200); printf("\r\n"); //初始化WIFI WiFiInit(); //使能WIFI if (EnableWifi() != WIFI_SUCCESS) { printf("EnableWifi failed, error = %d\r\n", error); return -1; } //判断WIFI是否激活 if (IsWifiActive() == 0) { printf("Wifi station is not actived.\r\n"); return -1; } //分配空间,保存WiFi信息 info = malloc(sizeof(WifiScanInfo) * WIFI_SCAN_HOTSPOT_LIMIT); if (info == NULL) { return -1; } //轮询查找WiFi列表 do{ //重置标志位 ssid_count = 0; g_staScanSuccess = 0; //开始扫描 Scan(); //等待扫描结果 WaitSacnResult(); //获取扫描列表 error = GetScanInfoList(info, &size); }while(g_staScanSuccess != 1); //打印WiFi列表 printf("********************\r\n"); for(uint8_t i = 0; i < ssid_count; i++) { printf("no:%03d, ssid:%-30s, rssi:%5d\r\n", i+1, info[i].ssid, info[i].rssi/100); } printf("********************\r\n"); //连接指定的WiFi热点 for(uint8_t i = 0; i < ssid_count; i++) { if (strcmp(ssid, info[i].ssid) == 0) { int result; printf("Select:%3d wireless, Waiting...\r\n", i+1); //拷贝要连接的热点信息 WifiDeviceConfig select_ap_config = {0}; strcpy(select_ap_config.ssid, info[i].ssid); strcpy(select_ap_config.preSharedKey, psk); select_ap_config.securityType = SELECT_WIFI_SECURITYTYPE; if (AddDeviceConfig(&select_ap_config, &result) == WIFI_SUCCESS) { if (ConnectTo(result) == WIFI_SUCCESS && WaitConnectResult() == 1) { printf("WiFi connect succeed!\r\n"); g_lwip_netif = netifapi_netif_find(SELECT_WLAN_PORT); break; } } } if(i == ssid_count-1) { printf("ERROR: No wifi as expected\r\n"); while(1) osDelay(100); } } //启动DHCP if (g_lwip_netif) { dhcp_start(g_lwip_netif); printf("begain to dhcp\r\n"); } //等待DHCP for(;;) { if(dhcp_is_bound(g_lwip_netif) == ERR_OK) { printf("\r\n"); //打印获取到的IP信息 netifapi_netif_common(g_lwip_netif, dhcp_clients_info_show, NULL); break; } printf("\r\n"); osDelay(100); } osDelay(100); return 0; } static void WiFiInit(void) { printf("\r\n"); g_wifiEventHandler.OnWifiScanStateChanged = OnWifiScanStateChangedHandler; g_wifiEventHandler.OnWifiConnectionChanged = OnWifiConnectionChangedHandler; g_wifiEventHandler.OnHotspotStaJoin = OnHotspotStaJoinHandler; g_wifiEventHandler.OnHotspotStaLeave = OnHotspotStaLeaveHandler; g_wifiEventHandler.OnHotspotStateChanged = OnHotspotStateChangedHandler; error = RegisterWifiEvent(&g_wifiEventHandler); if (error != WIFI_SUCCESS) { printf("register wifi event fail!\r\n"); } else { printf("register wifi event succeed!\r\n"); } } static void OnWifiScanStateChangedHandler(int state, int size) { if (size > 0) { ssid_count = size; g_staScanSuccess = 1; } printf("callback function for wifi scan:%d, %d\r\n", state, size); return; } static void OnWifiConnectionChangedHandler(int state, WifiLinkedInfo *info) { if (info == NULL) { printf("WifiConnectionChanged:info is null, stat is %d.\n", state); } else { if (state == WIFI_STATE_AVALIABLE) { g_ConnectSuccess = 1; } else { g_ConnectSuccess = 0; } } } static void OnHotspotStaJoinHandler(StationInfo *info) { (void)info; printf("STA join AP\n"); return; } static void OnHotspotStaLeaveHandler(StationInfo *info) { (void)info; printf("HotspotStaLeave:info is null.\n"); return; } static void OnHotspotStateChangedHandler(int state) { printf("HotspotStateChanged:state is %d.\n", state); return; } static void WaitSacnResult(void) { int scanTimeout = DEF_TIMEOUT; while (scanTimeout > 0) { sleep(ONE_SECOND); scanTimeout--; if (g_staScanSuccess == 1) { printf("WaitSacnResult:wait success[%d]s\n", (DEF_TIMEOUT - scanTimeout)); break; } } if (scanTimeout 0) { sleep(ONE_SECOND); ConnectTimeout--; if (g_ConnectSuccess == 1) { printf("WaitConnectResult:wait success[%d]s\n", (DEF_TIMEOUT - ConnectTimeout)); break; } } if (ConnectTimeout topicName->lenstring.len, data->topicName->lenstring.data, data->message->payloadlen, data->message->payload); } printf("MQTTSubscribe ...\n"); rc = MQTTSubscribe(&client, "substopic", 2, messageArrived); 4.6 发送消息 MQTTMessage message; char payload[30]; message.qos = 2; message.retained = 0; message.payload = payload; sprintf(payload, "message number %d", count); message.payloadlen = strlen(payload); rc = MQTTPublish(&client, "pubtopic", &message); 4.7 完整代码 #include #include #include #include "ohos_init.h" #include "cmsis_os2.h" #include "wifi_connect.h" #include "MQTTClient.h" static unsigned char sendBuf[1000]; static unsigned char readBuf[1000]; Network network; void messageArrived(MessageData* data) { printf("Message arrived on topic %.*s: %.*s\n", data->topicName->lenstring.len, data->topicName->lenstring.data, data->message->payloadlen, data->message->payload); } /* */ static void MQTT_DemoTask(void) { WifiConnect("test","12345678"); // 修改为要连接的WIFI printf("Starting ...\n"); int rc, count = 0; MQTTClient client; NetworkInit(&network); printf("NetworkConnect ...\n"); begin: NetworkConnect(&network, "192.168.31.225", 1883);// 修改为要连接的电脑IP printf("MQTTClientInit ...\n"); MQTTClientInit(&client, &network, 2000, sendBuf, sizeof(sendBuf), readBuf, sizeof(readBuf)); MQTTString clientId = MQTTString_initializer; clientId.cstring = "bearpi"; MQTTPacket_connectData data = MQTTPacket_connectData_initializer; data.clientID = clientId; data.willFlag = 0; data.MQTTVersion = 3; data.keepAliveInterval = 0; data.cleansession = 1; printf("MQTTConnect ...\n"); rc = MQTTConnect(&client, &data); if (rc != 0) { printf("MQTTConnect: %d\n", rc); NetworkDisconnect(&network); MQTTDisconnect(&client); osDelay(200); goto begin; } printf("MQTTSubscribe ...\n"); rc = MQTTSubscribe(&client, "substopic", 2, messageArrived); if (rc != 0) { printf("MQTTSubscribe: %d\n", rc); osDelay(200); goto begin; } while (++count) { MQTTMessage message; char payload[30]; message.qos = 2; message.retained = 0; message.payload = payload; sprintf(payload, "message number %d", count); message.payloadlen = strlen(payload); if ((rc = MQTTPublish(&client, "pubtopic", &message)) != 0){ printf("Return code from MQTT publish is %d\n", rc); NetworkDisconnect(&network); MQTTDisconnect(&client); goto begin; } osDelay(50); } } static void MQTT_Demo(void) { osThreadAttr_t attr; attr.name = "MQTT_DemoTask"; attr.attr_bits = 0U; attr.cb_mem = NULL; attr.cb_size = 0U; attr.stack_mem = NULL; attr.stack_size = 10240; attr.priority = osPriorityNormal; if (osThreadNew((osThreadFunc_t)MQTT_DemoTask, NULL, &attr) == NULL) { printf("[MQTT_Demo] Falied to create MQTT_DemoTask!\n"); } } APP_FEATURE_INIT(MQTT_Demo); 五、搭建本地MQTT服务器

EMQ官网下载:https://www.emqx.com/zh/downloads?product=broker

下载EMQ X开源版

解压后进入 emqx-windows-4.3.8\emqx\bin 目录(路径不要带有空格或中文) Shift+右键在此处打开 Powershell 窗口,输入命令 emqx start 打开浏览器,输入 http://127.0.0.1:18083/,账号 admin,密码 public,进入管理界面

工具 - Websocket,选择连接

订阅主题和发布消息

六、运行测试

调试打印:

服务器查看:

• 由 Leung 写于 2021 年 4 月 4 日

• 参考:【鸿蒙2.0设备开发教程】小熊派HarmonyOS 鸿蒙·季 开发教程     OpenHarmony轻量系统开发【11】移植MQTT     在鸿蒙系统上使用MQTT编程



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3